package com.ideal.manage.collection.controller;

import com.ideal.manage.collection.model.MSG;
import com.ideal.manage.collection.service.InboundService;
import com.ideal.manage.collection.utils.DateUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.client.RestTemplate;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Properties;

/**
 * Desc:     数据上服务
 * Author: Iron
 * CreateDate: 2016-10-12
 * CopyRight: Beijing Yunzong Co., Ltd.
 */
@Controller
@RequestMapping("/inbound")
public class InboundController{
    private final Logger logger = LogManager.getLogger(InboundController.class);
    @Autowired
    private InboundService inboundService;

    @Autowired
    private RestTemplate restTemplate;


    /**
     * 测试链接
     * @param request
     * @param response
     */
    /*@RequestMapping(value = "/hello", method = RequestMethod.GET)
    public void sayHello(HttpServletRequest request, HttpServletResponse response) {
        logger.debug("----> come into InboundController.sayHello -->");
        System.out.println("test.................");
        PrintWriter pw = getJsonPrintWriter(response);
        response.setStatus(HttpStatus.OK.value());
        pw.println("hello guy from " + request.getRemoteHost());
        pw.flush();
        closePrintWriter(pw);
    }*/

    /**
     * 上报数据（数据采集）
     *  支持kafka,redis,mysql
     * @param jsonStr
     * @author xingshen.zhao
     */
    @RequestMapping(value = "/onlineCampaignRealTime", method = RequestMethod.POST, consumes = "application/json; charset=utf-8")
    @ResponseBody
    public void onlineCampaignRealTime(@RequestBody String jsonStr) {
        logger.debug("--> come into InboundController.handleMallInbound for mall-->");

            try {
                logger.info("------> 时间" + DateUtils.getCurrentTime() + ":开始上报数据....");
                //调用kafka消息队列
                String retMsg = inboundService.handleEquipInbound(jsonStr.toString());
                if (retMsg.equals("success")) {
                    logger.info("----->上报成功!");
                }
                logger.info("------> 时间" + DateUtils.getCurrentTime() + ":上报数据成功!");
            } catch (Exception ex) {
                logger.info("----->上报失败!");
            }
    }

    /**
     * 启动消费者
     */
    @RequestMapping(value = "/start/consumer", method = RequestMethod.GET)
    public MSG startConsumer() {
        logger.debug("----> come into InboundController.sayHello -->");
        System.out.println("-------------------consumer start........ -----------------------");

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "47.101.210.65:9092");
        /**
         * //每个消费者分配独立的组号
         */
        properties.put("group.id", "group-1");
        //如果value合法，则自动提交偏移量
        properties.put("enable.auto.commit", "true");
        //设置多久一次更新被消费消息的偏移量
        properties.put("auto.commit.interval.ms", "1000");

        //自动重置offset
        properties.put("auto.offset.reset", "earliest");
        //设置会话响应的时间，超过这个时间kafka可以选择放弃消费或者消费下一条消息
        properties.put("session.timeout.ms", "30000");
        /**
         * 序列化
         */
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        /**
         * Consumer订阅了Topic为HelloWorld的消息，
         * Consumer调用poll方法来轮循Kafka集群的消息，其中的参数100是超时时间（Consumer等待直到Kafka集群中没有消息为止）：
         */
        kafkaConsumer.subscribe(Arrays.asList("test"));

        CustomerRunnable customerRunnable =new CustomerRunnable(kafkaConsumer,restTemplate);
        Thread thread = new Thread(customerRunnable);
        thread.start();
        logger.info("-------------------consumer start success-----------------------");

        MSG msg = new MSG("200","--->消费者启动");
        return msg;

    }

}
